package batch;

import lombok.Data;
import lombok.ToString;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;

import java.util.ArrayList;

public class JoinWithObject {
    public static void main(String[] args) throws Exception {
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        ArrayList<Tuple2<Integer, String>> list1 = new ArrayList<>();
        list1.add(new Tuple2<>(1, "lily"));
        list1.add(new Tuple2<>(2, "lucy"));
        list1.add(new Tuple2<>(3, "tom"));
        list1.add(new Tuple2<>(4, "jack"));

        ArrayList<Tuple2<Integer, String>> list2 = new ArrayList<>();
        list2.add(new Tuple2<>(1, "beijing"));
        list2.add(new Tuple2<>(2, "shanghai"));
        list2.add(new Tuple2<>(3, "guangzhou"));

        DataSet<Tuple2<Integer, String>> ds1 = env.fromCollection(list1);
        DataSet<Tuple2<Integer, String>> ds2 = env.fromCollection(list2);

        JoinOperator.EquiJoin<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> userObj = ds1.join(ds2).where(0).equalTo(0)
                .with(new UserInfoJoinFun());
        userObj.print();
    }

    public static class UserInfoJoinFun implements JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, UserInfo> {
        @Override
        public UserInfo join(Tuple2<Integer, String> f, Tuple2<Integer, String> s) throws Exception {
           return new UserInfo(f.f0, f.f1, s.f1);
        }
    }

    @Data
    @ToString
    public static class UserInfo {
        private Integer userId;
        private String userName;
        private String address;

        public UserInfo(Integer userId, String userName, String address) {
            this.userId = userId;
            this.userName = userName;
            this.address = address;
        }

    }
}